"""
The ``mlflow.projects`` module provides an API for running MLflow projects locally or remotely.
"""

from __future__ import print_function

from distutils import dir_util
import hashlib
import json
import yaml
import os
import sys
import shutil
from six.moves import urllib
import subprocess
import tempfile
import logging
import posixpath
import docker

import mlflow.projects.databricks
import mlflow.tracking as tracking
import mlflow.tracking.fluent as fluent
from mlflow.entities import RunStatus, SourceType
from mlflow.exceptions import ExecutionException, MlflowException
from mlflow.projects import _project_spec
from mlflow.projects.submitted_run import LocalSubmittedRun, SubmittedRun
from mlflow.projects.utils import (
    _is_file_uri, _is_local_uri, _is_valid_branch_name, _is_zip_uri,
    _get_git_repo_url, _parse_subdirectory, _expand_uri, _get_storage_dir,
    _GIT_URI_REGEX,
)
from mlflow.tracking.context.default_context import _get_user
from mlflow.tracking.context.git_context import _get_git_commit
from mlflow.tracking.fluent import _get_experiment_id
from mlflow.store.artifact.artifact_repository_registry import get_artifact_repository
from mlflow.store.artifact.azure_blob_artifact_repo import AzureBlobArtifactRepository
from mlflow.store.artifact.gcs_artifact_repo import GCSArtifactRepository
from mlflow.store.artifact.hdfs_artifact_repo import HdfsArtifactRepository
from mlflow.store.artifact.local_artifact_repo import LocalArtifactRepository
from mlflow.store.artifact.s3_artifact_repo import S3ArtifactRepository
from mlflow.utils import databricks_utils, file_utils, process
from mlflow.utils.file_utils import path_to_local_sqlite_uri, path_to_local_file_uri
from mlflow.utils.mlflow_tags import (
    MLFLOW_PROJECT_ENV, MLFLOW_DOCKER_IMAGE_URI, MLFLOW_DOCKER_IMAGE_ID, MLFLOW_USER,
    MLFLOW_SOURCE_NAME, MLFLOW_SOURCE_TYPE, MLFLOW_GIT_COMMIT, MLFLOW_GIT_REPO_URL,
    MLFLOW_GIT_BRANCH, LEGACY_MLFLOW_GIT_REPO_URL, LEGACY_MLFLOW_GIT_BRANCH_NAME,
    MLFLOW_PROJECT_ENTRY_POINT, MLFLOW_PARENT_RUN_ID, MLFLOW_PROJECT_BACKEND,
)
from mlflow.utils.uri import get_db_profile_from_uri, is_databricks_uri

# Environment variable indicating a path to a conda installation. MLflow will default to running
# "conda" if unset
MLFLOW_CONDA_HOME = "MLFLOW_CONDA_HOME"
_GENERATED_DOCKERFILE_NAME = "Dockerfile.mlflow-autogenerated"
_PROJECT_TAR_ARCHIVE_NAME = "mlflow-project-docker-build-context"
_MLFLOW_DOCKER_TRACKING_DIR_PATH = "/mlflow/tmp/mlruns"
_MLFLOW_DOCKER_WORKDIR_PATH = "/mlflow/projects/code/"

_logger = logging.getLogger(__name__)


def _resolve_experiment_id(experiment_name=None, experiment_id=None):
    """
    Resolve experiment.

    Verifies either one or other is specified - cannot be both selected.

    If ``experiment_name`` is provided and does not exist, an experiment
    of that name is created and its id is returned.

    :param experiment_name: Name of experiment under which to launch the run.
    :param experiment_id: ID of experiment under which to launch the run.
    :return: str
    """

    if experiment_name and experiment_id:
        raise MlflowException("Specify only one of 'experiment_name' or 'experiment_id'.")

    if experiment_id:
        return str(experiment_id)

    if experiment_name:
        client = tracking.MlflowClient()
        exp = client.get_experiment_by_name(experiment_name)
        if exp:
            return exp.experiment_id
        else:
            print("INFO: '{}' does not exist. Creating a new experiment".format(experiment_name))
            return client.create_experiment(experiment_name)

    return _get_experiment_id()


def _run(uri, experiment_id, entry_point="main", version=None, parameters=None,
         backend=None, backend_config=None, use_conda=True,
         storage_dir=None, synchronous=True, run_id=None):
    """
    Helper that delegates to the project-running method corresponding to the passed-in backend.
    Returns a ``SubmittedRun`` corresponding to the project run.
    """

    parameters = parameters or {}
    work_dir = _fetch_project(uri=uri, force_tempdir=False, version=version)
    project = _project_spec.load_project(work_dir)
    _validate_execution_environment(project, backend)
    project.get_entry_point(entry_point)._validate_parameters(parameters)
    if run_id:
        active_run = tracking.MlflowClient().get_run(run_id)
    else:
        active_run = _create_run(uri, experiment_id, work_dir, entry_point)

    # Consolidate parameters for logging.
    # `storage_dir` is `None` since we want to log actual path not downloaded local path
    entry_point_obj = project.get_entry_point(entry_point)
    final_params, extra_params = entry_point_obj.compute_parameters(parameters, storage_dir=None)
    for key, value in (list(final_params.items()) + list(extra_params.items())):
        tracking.MlflowClient().log_param(active_run.info.run_id, key, value)

    repo_url = _get_git_repo_url(work_dir)
    if repo_url is not None:
        for tag in [MLFLOW_GIT_REPO_URL, LEGACY_MLFLOW_GIT_REPO_URL]:
            tracking.MlflowClient().set_tag(active_run.info.run_id, tag, repo_url)

    # Add branch name tag if a branch is specified through -version
    if _is_valid_branch_name(work_dir, version):
        for tag in [MLFLOW_GIT_BRANCH, LEGACY_MLFLOW_GIT_BRANCH_NAME]:
            tracking.MlflowClient().set_tag(active_run.info.run_id, tag, version)

    if backend == "databricks":
        tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_PROJECT_BACKEND,
                                        "databricks")
        from mlflow.projects.databricks import run_databricks
        return run_databricks(
            remote_run=active_run,
            uri=uri, entry_point=entry_point, work_dir=work_dir, parameters=parameters,
            experiment_id=experiment_id, cluster_spec=backend_config)

    elif backend == "local" or backend is None:
        tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_PROJECT_BACKEND, "local")
        command_args = []
        command_separator = " "
        # If a docker_env attribute is defined in MLproject then it takes precedence over conda yaml
        # environments, so the project will be executed inside a docker container.
        if project.docker_env:
            tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_PROJECT_ENV,
                                            "docker")
            _validate_docker_env(project)
            _validate_docker_installation()
            image = _build_docker_image(work_dir=work_dir,
                                        repository_uri=project.name,
                                        base_image=project.docker_env.get('image'),
                                        run_id=active_run.info.run_id)
            command_args += _get_docker_command(image=image, active_run=active_run,
                                                volumes=project.docker_env.get("volumes"),
                                                user_env_vars=project.docker_env.get("environment"))
        # Synchronously create a conda environment (even though this may take some time)
        # to avoid failures due to multiple concurrent attempts to create the same conda env.
        elif use_conda:
            tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_PROJECT_ENV, "conda")
            command_separator = " && "
            conda_env_name = _get_or_create_conda_env(project.conda_env_path)
            command_args += _get_conda_command(conda_env_name)
        # In synchronous mode, run the entry point command in a blocking fashion, sending status
        # updates to the tracking server when finished. Note that the run state may not be
        # persisted to the tracking server if interrupted
        if synchronous:
            command_args += _get_entry_point_command(project, entry_point, parameters, storage_dir)
            command_str = command_separator.join(command_args)
            return _run_entry_point(command_str, work_dir, experiment_id,
                                    run_id=active_run.info.run_id)
        # Otherwise, invoke `mlflow run` in a subprocess
        return _invoke_mlflow_run_subprocess(
            work_dir=work_dir, entry_point=entry_point, parameters=parameters,
            experiment_id=experiment_id,
            use_conda=use_conda, storage_dir=storage_dir, run_id=active_run.info.run_id)
    elif backend == "kubernetes":
        from mlflow.projects import kubernetes as kb
        tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_PROJECT_ENV, "docker")
        tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_PROJECT_BACKEND,
                                        "kubernetes")
        _validate_docker_env(project)
        _validate_docker_installation()
        kube_config = _parse_kubernetes_config(backend_config)
        image = _build_docker_image(work_dir=work_dir,
                                    repository_uri=kube_config["repository-uri"],
                                    base_image=project.docker_env.get('image'),
                                    run_id=active_run.info.run_id)
        image_digest = kb.push_image_to_registry(image.tags[0])
        submitted_run = kb.run_kubernetes_job(
            project.name,
            active_run,
            image.tags[0],
            image_digest,
            _get_entry_point_command(project, entry_point, parameters, storage_dir),
            _get_run_env_vars(
                run_id=active_run.info.run_uuid,
                experiment_id=active_run.info.experiment_id
            ),
            kube_config.get('kube-context', None),
            kube_config['kube-job-template']
        )
        return submitted_run

    supported_backends = ["local", "databricks", "kubernetes"]
    raise ExecutionException("Got unsupported execution mode %s. Supported "
                             "values: %s" % (backend, supported_backends))


def run(uri, entry_point="main", version=None, parameters=None,
        experiment_name=None, experiment_id=None,
        backend=None, backend_config=None, use_conda=True,
        storage_dir=None, synchronous=True, run_id=None):
    """
    Run an MLflow project. The project can be local or stored at a Git URI.

    You can run the project locally or remotely on a Databricks.

    For information on using this method in chained workflows, see `Building Multistep Workflows
    <../projects.html#building-multistep-workflows>`_.

    :raises: :py:class:`mlflow.exceptions.ExecutionException` If a run launched in blocking mode
             is unsuccessful.

    :param uri: URI of project to run. A local filesystem path
                or a Git repository URI (e.g. https://github.com/mlflow/mlflow-example)
                pointing to a project directory containing an MLproject file.
    :param entry_point: Entry point to run within the project. If no entry point with the specified
                        name is found, runs the project file ``entry_point`` as a script,
                        using "python" to run ``.py`` files and the default shell (specified by
                        environment variable ``$SHELL``) to run ``.sh`` files.
    :param version: For Git-based projects, either a commit hash or a branch name.
    :param experiment_name: Name of experiment under which to launch the run.
    :param experiment_id: ID of experiment under which to launch the run.
    :param backend: Execution backend for the run: "local", "databricks", or "kubernetes"
                    (experimental). If running against Databricks, will run against a Databricks
                    workspace determined as follows: if a Databricks tracking URI of the form
                    ``databricks://profile`` has been set (e.g. by setting the
                    MLFLOW_TRACKING_URI environment variable), will run against the workspace
                    specified by <profile>. Otherwise, runs against the workspace specified by
                    the default Databricks CLI profile.
    :param backend_config: A dictionary, or a path to a JSON file (must end in '.json'), which will
                           be passed as config to the backend. The exact content which should be
                           provided is different for each execution backend and is documented
                           at https://www.mlflow.org/docs/latest/projects.html.
    :param use_conda: If True (the default), create a new Conda environment for the run and
                      install project dependencies within that environment. Otherwise, run the
                      project in the current environment without installing any project
                      dependencies.
    :param storage_dir: Used only if ``backend`` is "local". MLflow downloads artifacts from
                        distributed URIs passed to parameters of type ``path`` to subdirectories of
                        ``storage_dir``.
    :param synchronous: Whether to block while waiting for a run to complete. Defaults to True.
                        Note that if ``synchronous`` is False and ``backend`` is "local", this
                        method will return, but the current process will block when exiting until
                        the local run completes. If the current process is interrupted, any
                        asynchronous runs launched via this method will be terminated. If
                        ``synchronous`` is True and the run fails, the current process will
                        error out as well.
    :param run_id: Note: this argument is used internally by the MLflow project APIs and should
                   not be specified. If specified, the run ID will be used instead of
                   creating a new run.
    :return: :py:class:`mlflow.projects.SubmittedRun` exposing information (e.g. run ID)
             about the launched run.
    """

    cluster_spec_dict = backend_config
    if (backend_config and type(backend_config) != dict
            and os.path.splitext(backend_config)[-1] == ".json"):
        with open(backend_config, 'r') as handle:
            try:
                cluster_spec_dict = json.load(handle)
            except ValueError:
                _logger.error(
                    "Error when attempting to load and parse JSON cluster spec from file %s",
                    backend_config)
                raise

    if backend == "databricks":
        mlflow.projects.databricks.before_run_validations(mlflow.get_tracking_uri(), backend_config)

    experiment_id = _resolve_experiment_id(experiment_name=experiment_name,
                                           experiment_id=experiment_id)

    submitted_run_obj = _run(
        uri=uri, experiment_id=experiment_id, entry_point=entry_point, version=version,
        parameters=parameters, backend=backend, backend_config=cluster_spec_dict,
        use_conda=use_conda, storage_dir=storage_dir, synchronous=synchronous, run_id=run_id)
    if synchronous:
        _wait_for(submitted_run_obj)
    return submitted_run_obj


def _wait_for(submitted_run_obj):
    """Wait on the passed-in submitted run, reporting its status to the tracking server."""
    run_id = submitted_run_obj.run_id
    active_run = None
    # Note: there's a small chance we fail to report the run's status to the tracking server if
    # we're interrupted before we reach the try block below
    try:
        active_run = tracking.MlflowClient().get_run(run_id) if run_id is not None else None
        if submitted_run_obj.wait():
            _logger.info("=== Run (ID '%s') succeeded ===", run_id)
            _maybe_set_run_terminated(active_run, "FINISHED")
        else:
            _maybe_set_run_terminated(active_run, "FAILED")
            raise ExecutionException("Run (ID '%s') failed" % run_id)
    except KeyboardInterrupt:
        _logger.error("=== Run (ID '%s') interrupted, cancelling run ===", run_id)
        submitted_run_obj.cancel()
        _maybe_set_run_terminated(active_run, "FAILED")
        raise


def _fetch_project(uri, force_tempdir, version=None):
    """
    Fetch a project into a local directory, returning the path to the local project directory.
    :param force_tempdir: If True, will fetch the project into a temporary directory. Otherwise,
                          will fetch ZIP or Git projects into a temporary directory but simply
                          return the path of local projects (i.e. perform a no-op for local
                          projects).
    """
    parsed_uri, subdirectory = _parse_subdirectory(uri)
    use_temp_dst_dir = force_tempdir or _is_zip_uri(parsed_uri) or not _is_local_uri(parsed_uri)
    dst_dir = tempfile.mkdtemp() if use_temp_dst_dir else parsed_uri
    if use_temp_dst_dir:
        _logger.info("=== Fetching project from %s into %s ===", uri, dst_dir)
    if _is_zip_uri(parsed_uri):
        if _is_file_uri(parsed_uri):
            parsed_file_uri = urllib.parse.urlparse(urllib.parse.unquote(parsed_uri))
            parsed_uri = os.path.join(parsed_file_uri.netloc, parsed_file_uri.path)
        _unzip_repo(zip_file=(
            parsed_uri if _is_local_uri(parsed_uri) else _fetch_zip_repo(parsed_uri)),
            dst_dir=dst_dir)
    elif _is_local_uri(uri):
        if version is not None:
            raise ExecutionException("Setting a version is only supported for Git project URIs")
        if use_temp_dst_dir:
            dir_util.copy_tree(src=parsed_uri, dst=dst_dir)
    else:
        assert _GIT_URI_REGEX.match(parsed_uri), "Non-local URI %s should be a Git URI" % parsed_uri
        _fetch_git_repo(parsed_uri, version, dst_dir)
    res = os.path.abspath(os.path.join(dst_dir, subdirectory))
    if not os.path.exists(res):
        raise ExecutionException("Could not find subdirectory %s of %s" % (subdirectory, dst_dir))
    return res


def _unzip_repo(zip_file, dst_dir):
    import zipfile
    with zipfile.ZipFile(zip_file) as zip_in:
        zip_in.extractall(dst_dir)


def _fetch_zip_repo(uri):
    import requests
    from io import BytesIO
    # TODO (dbczumar): Replace HTTP resolution via ``requests.get`` with an invocation of
    # ```mlflow.data.download_uri()`` when the API supports the same set of available stores as
    # the artifact repository (Azure, FTP, etc). See the following issue:
    # https://github.com/mlflow/mlflow/issues/763.
    response = requests.get(uri)
    try:
        response.raise_for_status()
    except requests.HTTPError as error:
        raise ExecutionException("Unable to retrieve ZIP file. Reason: %s" % str(error))
    return BytesIO(response.content)


def _fetch_git_repo(uri, version, dst_dir):
    """
    Clone the git repo at ``uri`` into ``dst_dir``, checking out commit ``version`` (or defaulting
    to the head commit of the repository's master branch if version is unspecified).
    Assumes authentication parameters are specified by the environment, e.g. by a Git credential
    helper.
    """
    # We defer importing git until the last moment, because the import requires that the git
    # executable is availble on the PATH, so we only want to fail if we actually need it.
    import git
    repo = git.Repo.init(dst_dir)
    origin = repo.create_remote("origin", uri)
    origin.fetch()
    if version is not None:
        try:
            repo.git.checkout(version)
        except git.exc.GitCommandError as e:
            raise ExecutionException("Unable to checkout version '%s' of git repo %s"
                                     "- please ensure that the version exists in the repo. "
                                     "Error: %s" % (version, uri, e))
    else:
        repo.create_head("master", origin.refs.master)
        repo.heads.master.checkout()
    repo.submodule_update(init=True, recursive=True)


def _get_conda_env_name(conda_env_path, env_id=None):
    conda_env_contents = open(conda_env_path).read() if conda_env_path else ""
    if env_id:
        conda_env_contents += env_id
    return "mlflow-%s" % hashlib.sha1(conda_env_contents.encode("utf-8")).hexdigest()


def _get_conda_bin_executable(executable_name):
    """
    Return path to the specified executable, assumed to be discoverable within the 'bin'
    subdirectory of a conda installation.

    The conda home directory (expected to contain a 'bin' subdirectory) is configurable via the
    ``mlflow.projects.MLFLOW_CONDA_HOME`` environment variable. If
    ``mlflow.projects.MLFLOW_CONDA_HOME`` is unspecified, this method simply returns the passed-in
    executable name.
    """
    conda_home = os.environ.get(MLFLOW_CONDA_HOME)
    if conda_home:
        return os.path.join(conda_home, "bin/%s" % executable_name)
    # Use CONDA_EXE as per https://github.com/conda/conda/issues/7126
    if "CONDA_EXE" in os.environ:
        conda_bin_dir = os.path.dirname(os.environ["CONDA_EXE"])
        return os.path.join(conda_bin_dir, executable_name)
    return executable_name


def _get_or_create_conda_env(conda_env_path, env_id=None):
    """
    Given a `Project`, creates a conda environment containing the project's dependencies if such a
    conda environment doesn't already exist. Returns the name of the conda environment.
    :param conda_env_path: Path to a conda yaml file.
    :param env_id: Optional string that is added to the contents of the yaml file before
                   calculating the hash. It can be used to distinguish environments that have the
                   same conda dependencies but are supposed to be different based on the context.
                   For example, when serving the model we may install additional dependencies to the
                   environment after the environment has been activated.
    """
    conda_path = _get_conda_bin_executable("conda")
    try:
        process.exec_cmd([conda_path, "--help"], throw_on_error=False)
    except EnvironmentError:
        raise ExecutionException("Could not find Conda executable at {0}. "
                                 "Ensure Conda is installed as per the instructions at "
                                 "https://conda.io/projects/conda/en/latest/"
                                 "user-guide/install/index.html. "
                                 "You can also configure MLflow to look for a specific "
                                 "Conda executable by setting the {1} environment variable "
                                 "to the path of the Conda executable"
                                 .format(conda_path, MLFLOW_CONDA_HOME))
    (_, stdout, _) = process.exec_cmd([conda_path, "env", "list", "--json"])
    env_names = [os.path.basename(env) for env in json.loads(stdout)['envs']]
    project_env_name = _get_conda_env_name(conda_env_path, env_id)
    if project_env_name not in env_names:
        _logger.info('=== Creating conda environment %s ===', project_env_name)
        if conda_env_path:
            process.exec_cmd([conda_path, "env", "create", "-n", project_env_name, "--file",
                              conda_env_path], stream_output=True)
        else:
            process.exec_cmd(
                [conda_path, "create", "-n", project_env_name, "python"], stream_output=True)
    return project_env_name


def _maybe_set_run_terminated(active_run, status):
    """
    If the passed-in active run is defined and still running (i.e. hasn't already been terminated
    within user code), mark it as terminated with the passed-in status.
    """
    if active_run is None:
        return
    run_id = active_run.info.run_id
    cur_status = tracking.MlflowClient().get_run(run_id).info.status
    if RunStatus.is_terminated(cur_status):
        return
    tracking.MlflowClient().set_terminated(run_id, status)


def _get_entry_point_command(project, entry_point, parameters, storage_dir):
    """
    Returns the shell command to execute in order to run the specified entry point.
    :param project: Project containing the target entry point
    :param entry_point: Entry point to run
    :param parameters: Parameters (dictionary) for the entry point command
    :param storage_dir: Base local directory to use for downloading remote artifacts passed to
                        arguments of type 'path'. If None, a temporary base directory is used.
    """
    storage_dir_for_run = _get_storage_dir(storage_dir)
    _logger.info(
        "=== Created directory %s for downloading remote URIs passed to arguments of"
        " type 'path' ===",
        storage_dir_for_run)
    commands = []
    commands.append(
        project.get_entry_point(entry_point).compute_command(parameters, storage_dir_for_run))
    return commands


def _run_entry_point(command, work_dir, experiment_id, run_id):
    """
    Run an entry point command in a subprocess, returning a SubmittedRun that can be used to
    query the run's status.
    :param command: Entry point command to run
    :param work_dir: Working directory in which to run the command
    :param run_id: MLflow run ID associated with the entry point execution.
    """
    env = os.environ.copy()
    env.update(_get_run_env_vars(run_id, experiment_id))
    _logger.info("=== Running command '%s' in run with ID '%s' === ", command, run_id)
    # in case os name is not 'nt', we are not running on windows. It introduces
    # bash command otherwise.
    if os.name != "nt":
        process = subprocess.Popen(["bash", "-c", command], close_fds=True, cwd=work_dir, env=env)
    else:
        process = subprocess.Popen(command, close_fds=True, cwd=work_dir, env=env)
    return LocalSubmittedRun(run_id, process)


def _build_mlflow_run_cmd(
        uri, entry_point, storage_dir, use_conda, run_id, parameters):
    """
    Build and return an array containing an ``mlflow run`` command that can be invoked to locally
    run the project at the specified URI.
    """
    mlflow_run_arr = ["mlflow", "run", uri, "-e", entry_point, "--run-id", run_id]
    if storage_dir is not None:
        mlflow_run_arr.extend(["--storage-dir", storage_dir])
    if not use_conda:
        mlflow_run_arr.append("--no-conda")
    for key, value in parameters.items():
        mlflow_run_arr.extend(["-P", "%s=%s" % (key, value)])
    return mlflow_run_arr


def _run_mlflow_run_cmd(mlflow_run_arr, env_map):
    """
    Invoke ``mlflow run`` in a subprocess, which in turn runs the entry point in a child process.
    Returns a handle to the subprocess. Popen launched to invoke ``mlflow run``.
    """
    final_env = os.environ.copy()
    final_env.update(env_map)
    # Launch `mlflow run` command as the leader of its own process group so that we can do a
    # best-effort cleanup of all its descendant processes if needed
    if sys.platform == "win32":
        return subprocess.Popen(
            mlflow_run_arr, env=final_env, universal_newlines=True,
            creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
    else:
        return subprocess.Popen(
            mlflow_run_arr, env=final_env, universal_newlines=True, preexec_fn=os.setsid)


def _create_run(uri, experiment_id, work_dir, entry_point):
    """
    Create a ``Run`` against the current MLflow tracking server, logging metadata (e.g. the URI,
    entry point, and parameters of the project) about the run. Return an ``ActiveRun`` that can be
    used to report additional data about the run (metrics/params) to the tracking server.
    """
    if _is_local_uri(uri):
        source_name = tracking._tracking_service.utils._get_git_url_if_present(_expand_uri(uri))
    else:
        source_name = _expand_uri(uri)
    source_version = _get_git_commit(work_dir)
    existing_run = fluent.active_run()
    if existing_run:
        parent_run_id = existing_run.info.run_id
    else:
        parent_run_id = None

    tags = {
        MLFLOW_USER: _get_user(),
        MLFLOW_SOURCE_NAME: source_name,
        MLFLOW_SOURCE_TYPE: SourceType.to_string(SourceType.PROJECT),
        MLFLOW_PROJECT_ENTRY_POINT: entry_point
    }
    if source_version is not None:
        tags[MLFLOW_GIT_COMMIT] = source_version
    if parent_run_id is not None:
        tags[MLFLOW_PARENT_RUN_ID] = parent_run_id

    active_run = tracking.MlflowClient().create_run(experiment_id=experiment_id, tags=tags)
    return active_run


def _get_run_env_vars(run_id, experiment_id):
    """
    Returns a dictionary of environment variable key-value pairs to set in subprocess launched
    to run MLflow projects.
    """
    return {
        tracking._RUN_ID_ENV_VAR: run_id,
        tracking._TRACKING_URI_ENV_VAR: tracking.get_tracking_uri(),
        tracking._EXPERIMENT_ID_ENV_VAR: str(experiment_id),
    }


def _invoke_mlflow_run_subprocess(
        work_dir, entry_point, parameters, experiment_id, use_conda, storage_dir, run_id):
    """
    Run an MLflow project asynchronously by invoking ``mlflow run`` in a subprocess, returning
    a SubmittedRun that can be used to query run status.
    """
    _logger.info("=== Asynchronously launching MLflow run with ID %s ===", run_id)
    mlflow_run_arr = _build_mlflow_run_cmd(
        uri=work_dir, entry_point=entry_point, storage_dir=storage_dir, use_conda=use_conda,
        run_id=run_id, parameters=parameters)
    mlflow_run_subprocess = _run_mlflow_run_cmd(
        mlflow_run_arr, _get_run_env_vars(run_id, experiment_id))
    return LocalSubmittedRun(run_id, mlflow_run_subprocess)


def _get_conda_command(conda_env_name):
    #  Checking for newer conda versions
    if os.name != 'nt' and ('CONDA_EXE' in os.environ or 'MLFLOW_CONDA_HOME' in os.environ):
        conda_path = _get_conda_bin_executable("conda")
        activate_conda_env = [
            'source {}/../etc/profile.d/conda.sh'.format(os.path.dirname(conda_path))
        ]
        activate_conda_env += ["conda activate {} 1>&2".format(conda_env_name)]
    else:
        activate_path = _get_conda_bin_executable("activate")
        # in case os name is not 'nt', we are not running on windows. It introduces
        # bash command otherwise.
        if os.name != "nt":
            return ["source %s %s 1>&2" % (activate_path, conda_env_name)]
        else:
            return ["conda activate %s" % (conda_env_name)]
    return activate_conda_env


def _validate_execution_environment(project, backend):
    if project.docker_env and backend == "databricks":
        raise ExecutionException(
            "Running docker-based projects on Databricks is not yet supported.")


def _get_local_uri_or_none(uri):
    if uri == "databricks":
        return None, None
    parsed_uri = urllib.parse.urlparse(uri)
    if not parsed_uri.netloc and parsed_uri.scheme in ("", "file", "sqlite"):
        path = urllib.request.url2pathname(parsed_uri.path)
        if parsed_uri.scheme == "sqlite":
            uri = path_to_local_sqlite_uri(_MLFLOW_DOCKER_TRACKING_DIR_PATH)
        else:
            uri = path_to_local_file_uri(_MLFLOW_DOCKER_TRACKING_DIR_PATH)
        return path, uri
    else:
        return None, None


def _get_docker_command(image, active_run, volumes=None, user_env_vars=None):
    docker_path = "docker"
    cmd = [docker_path, "run", "--rm"]
    env_vars = _get_run_env_vars(run_id=active_run.info.run_id,
                                 experiment_id=active_run.info.experiment_id)
    tracking_uri = tracking.get_tracking_uri()
    tracking_cmds, tracking_envs = _get_docker_tracking_cmd_and_envs(tracking_uri)
    artifact_cmds, artifact_envs = \
        _get_docker_artifact_storage_cmd_and_envs(active_run.info.artifact_uri)

    cmd += tracking_cmds + artifact_cmds
    env_vars.update(tracking_envs)
    env_vars.update(artifact_envs)
    if user_env_vars is not None:
        for user_entry in user_env_vars:
            if isinstance(user_entry, list):
                # User has defined a new environment variable for the docker environment
                env_vars[user_entry[0]] = user_entry[1]
            else:
                # User wants to copy an environment variable from system environment
                system_var = os.environ.get(user_entry)
                if system_var is None:
                    raise MlflowException(
                        "This project expects the %s environment variables to "
                        "be set on the machine running the project, but %s was "
                        "not set. Please ensure all expected environment variables "
                        "are set" % (", ".join(user_env_vars), user_entry))
                env_vars[user_entry] = system_var

    if volumes is not None:
        for v in volumes:
            cmd += ["-v", v]

    for key, value in env_vars.items():
        cmd += ["-e", "{key}={value}".format(key=key, value=value)]
    cmd += [image.tags[0]]
    return cmd


def _validate_docker_installation():
    """
    Verify if Docker is installed on host machine.
    """
    try:
        docker_path = "docker"
        process.exec_cmd([docker_path, "--help"], throw_on_error=False)
    except EnvironmentError:
        raise ExecutionException("Could not find Docker executable. "
                                 "Ensure Docker is installed as per the instructions "
                                 "at https://docs.docker.com/install/overview/.")


def _validate_docker_env(project):
    if not project.name:
        raise ExecutionException("Project name in MLProject must be specified when using docker "
                                 "for image tagging.")
    if not project.docker_env.get('image'):
        raise ExecutionException("Project with docker environment must specify the docker image "
                                 "to use via an 'image' field under the 'docker_env' field.")


def _parse_kubernetes_config(backend_config):
    """
    Creates build context tarfile containing Dockerfile and project code, returning path to tarfile
    """
    if not backend_config:
        raise ExecutionException("Backend_config file not found.")
    kube_config = backend_config.copy()
    if 'kube-job-template-path' not in backend_config.keys():
        raise ExecutionException("'kube-job-template-path' attribute must be specified in "
                                 "backend_config.")
    kube_job_template = backend_config['kube-job-template-path']
    if os.path.exists(kube_job_template):
        with open(kube_job_template, 'r') as job_template:
            yaml_obj = yaml.safe_load(job_template.read())
        kube_job_template = yaml_obj
        kube_config['kube-job-template'] = kube_job_template
    else:
        raise ExecutionException("Could not find 'kube-job-template-path': {}".format(
            kube_job_template))
    if 'kube-context' not in backend_config.keys():
        _logger.debug("Could not find kube-context in backend_config."
                      " Using current context or in-cluster config.")
    if 'repository-uri' not in backend_config.keys():
        raise ExecutionException("Could not find 'repository-uri' in backend_config.")
    return kube_config


def _create_docker_build_ctx(work_dir, dockerfile_contents):
    """
    Creates build context tarfile containing Dockerfile and project code, returning path to tarfile
    """
    directory = tempfile.mkdtemp()
    try:
        dst_path = os.path.join(directory, "mlflow-project-contents")
        shutil.copytree(src=work_dir, dst=dst_path)
        with open(os.path.join(dst_path, _GENERATED_DOCKERFILE_NAME), "w") as handle:
            handle.write(dockerfile_contents)
        _, result_path = tempfile.mkstemp()
        file_utils.make_tarfile(
            output_filename=result_path,
            source_dir=dst_path, archive_name=_PROJECT_TAR_ARCHIVE_NAME)
    finally:
        shutil.rmtree(directory)
    return result_path


def _build_docker_image(work_dir, repository_uri, base_image, run_id):
    """
    Build a docker image containing the project in `work_dir`, using the base image.
    """
    image_uri = _get_docker_image_uri(repository_uri=repository_uri, work_dir=work_dir)
    dockerfile = (
        "FROM {imagename}\n"
        "COPY {build_context_path}/ {workdir}\n"
        "WORKDIR {workdir}\n"
    ).format(imagename=base_image,
             build_context_path=_PROJECT_TAR_ARCHIVE_NAME,
             workdir=_MLFLOW_DOCKER_WORKDIR_PATH)
    build_ctx_path = _create_docker_build_ctx(work_dir, dockerfile)
    with open(build_ctx_path, 'rb') as docker_build_ctx:
        _logger.info("=== Building docker image %s ===", image_uri)
        client = docker.from_env()
        image, _ = client.images.build(
            tag=image_uri, forcerm=True,
            dockerfile=posixpath.join(_PROJECT_TAR_ARCHIVE_NAME, _GENERATED_DOCKERFILE_NAME),
            fileobj=docker_build_ctx, custom_context=True, encoding="gzip")
    try:
        os.remove(build_ctx_path)
    except Exception:  # pylint: disable=broad-except
        _logger.info("Temporary docker context file %s was not deleted.", build_ctx_path)
    tracking.MlflowClient().set_tag(run_id,
                                    MLFLOW_DOCKER_IMAGE_URI,
                                    image_uri)
    tracking.MlflowClient().set_tag(run_id,
                                    MLFLOW_DOCKER_IMAGE_ID,
                                    image.id)
    return image


def _get_docker_image_uri(repository_uri, work_dir):
    """
    Returns an appropriate Docker image URI for a project based on the git hash of the specified
    working directory.

    :param repository_uri: The URI of the Docker repository with which to tag the image. The
                           repository URI is used as the prefix of the image URI.
    :param work_dir: Path to the working directory in which to search for a git commit hash
    """
    repository_uri = repository_uri if repository_uri else "docker-project"
    # Optionally include first 7 digits of git SHA in tag name, if available.
    git_commit = _get_git_commit(work_dir)
    version_string = ":" + git_commit[:7] if git_commit else ""
    return repository_uri + version_string


def _get_local_artifact_cmd_and_envs(artifact_repo):
    artifact_dir = artifact_repo.artifact_dir
    container_path = artifact_dir
    if not os.path.isabs(container_path):
        container_path = os.path.join(_MLFLOW_DOCKER_WORKDIR_PATH, container_path)
        container_path = os.path.normpath(container_path)
    abs_artifact_dir = os.path.abspath(artifact_dir)
    return ["-v", "%s:%s" % (abs_artifact_dir, container_path)], {}


def _get_s3_artifact_cmd_and_envs(artifact_repo):
    # pylint: disable=unused-argument
    aws_path = posixpath.expanduser("~/.aws")

    volumes = []
    if posixpath.exists(aws_path):
        volumes = ["-v", "%s:%s" % (str(aws_path), "/.aws")]
    envs = {
        "AWS_SECRET_ACCESS_KEY": os.environ.get("AWS_SECRET_ACCESS_KEY"),
        "AWS_ACCESS_KEY_ID": os.environ.get("AWS_ACCESS_KEY_ID"),
        "MLFLOW_S3_ENDPOINT_URL": os.environ.get("MLFLOW_S3_ENDPOINT_URL")
    }
    envs = dict((k, v) for k, v in envs.items() if v is not None)
    return volumes, envs


def _get_azure_blob_artifact_cmd_and_envs(artifact_repo):
    # pylint: disable=unused-argument
    envs = {
        "AZURE_STORAGE_CONNECTION_STRING": os.environ.get("AZURE_STORAGE_CONNECTION_STRING"),
        "AZURE_STORAGE_ACCESS_KEY": os.environ.get("AZURE_STORAGE_ACCESS_KEY")
    }
    envs = dict((k, v) for k, v in envs.items() if v is not None)
    return [], envs


def _get_gcs_artifact_cmd_and_envs(artifact_repo):
    # pylint: disable=unused-argument
    cmds = []
    envs = {}

    if "GOOGLE_APPLICATION_CREDENTIALS" in os.environ:
        credentials_path = os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
        cmds = ["-v", "{}:/.gcs".format(credentials_path)]
        envs["GOOGLE_APPLICATION_CREDENTIALS"] = "/.gcs"
    return cmds, envs


def _get_hdfs_artifact_cmd_and_envs(artifact_repo):
    # pylint: disable=unused-argument
    cmds = []
    envs = {
        "MLFLOW_HDFS_DRIVER": os.environ.get("MLFLOW_HDFS_DRIVER"),
        "MLFLOW_KERBEROS_TICKET_CACHE": os.environ.get("MLFLOW_KERBEROS_TICKET_CACHE"),
        "MLFLOW_KERBEROS_USER": os.environ.get("MLFLOW_KERBEROS_USER"),
        "MLFLOW_PYARROW_EXTRA_CONF": os.environ.get("MLFLOW_PYARROW_EXTRA_CONF")
    }
    envs = dict((k, v) for k, v in envs.items() if v is not None)

    if "MLFLOW_KERBEROS_TICKET_CACHE" in envs:
        ticket_cache = envs["MLFLOW_KERBEROS_TICKET_CACHE"]
        cmds = ["-v", "{}:{}".format(ticket_cache, ticket_cache)]
    return cmds, envs


_artifact_storages = {
    LocalArtifactRepository: _get_local_artifact_cmd_and_envs,
    S3ArtifactRepository: _get_s3_artifact_cmd_and_envs,
    AzureBlobArtifactRepository: _get_azure_blob_artifact_cmd_and_envs,
    HdfsArtifactRepository: _get_hdfs_artifact_cmd_and_envs,
    GCSArtifactRepository: _get_gcs_artifact_cmd_and_envs,
}


def _get_docker_artifact_storage_cmd_and_envs(artifact_uri):
    artifact_repo = get_artifact_repository(artifact_uri)
    _get_cmd_and_envs = _artifact_storages.get(type(artifact_repo))
    if _get_cmd_and_envs is not None:
        return _get_cmd_and_envs(artifact_repo)
    else:
        return [], {}


def _get_docker_tracking_cmd_and_envs(tracking_uri):
    cmds = []
    env_vars = dict()

    local_path, container_tracking_uri = _get_local_uri_or_none(tracking_uri)
    if local_path is not None:
        cmds = ["-v", "%s:%s" % (local_path, _MLFLOW_DOCKER_TRACKING_DIR_PATH)]
        env_vars[tracking._TRACKING_URI_ENV_VAR] = container_tracking_uri
    if is_databricks_uri(tracking_uri):
        db_profile = get_db_profile_from_uri(tracking_uri)
        config = databricks_utils.get_databricks_host_creds(db_profile)
        # We set these via environment variables so that only the current profile is exposed, rather
        # than all profiles in ~/.databrickscfg; maybe better would be to mount the necessary
        # part of ~/.databrickscfg into the container
        env_vars[tracking._TRACKING_URI_ENV_VAR] = 'databricks'
        env_vars['DATABRICKS_HOST'] = config.host
        if config.username:
            env_vars['DATABRICKS_USERNAME'] = config.username
        if config.password:
            env_vars['DATABRICKS_PASSWORD'] = config.password
        if config.token:
            env_vars['DATABRICKS_TOKEN'] = config.token
        if config.ignore_tls_verification:
            env_vars['DATABRICKS_INSECURE'] = config.ignore_tls_verification
    return cmds, env_vars


__all__ = [
    "run",
    "SubmittedRun"
]
